S3のアクセスログをサーバレスに処理してAthenaで解析してみた
はじめに
AWSチームのすずきです。
APIの署名バージョン情報がログ項目として追加されたS3のサーバアクセスログ、CloudTrailより廉価な利用が可能ですが、
出力されるログファイル数の多さ、正規表現によるログのパースの困難さなどにより、Athenaなどを利用した解析が難しい課題がありました。
今回この解決のため、ログファイルのパースと集約をLambdaとFirehoseを利用して実施、Athenaによる解析処理が行いやすいログ形式への変換を実現しました。
また、後処理のLambdaを用いて署名バージョン2(SigV2)のアクセスログのみを抽出。移行対応が必要となるログが存在した場合、 CloudWatch Logs で簡単に確認可能とする仕組みも用意しました。
これらの仕組みと、展開するテンプレートを紹介させて頂きます。
構成図
設置方法
CloudFormation展開
-
AWSコンソールにログイン済みのブラウザから、展開リンクをクリックします。
-
IAMリソースの作成を承認をチェックし「作成」します。
- 5分程度でリソース設置が完了します。
リソース説明
S3(オリジナルアクセスログ)
s3://<スタック名>-accesslog-<リージョン>-<アカウントID>
- 調査対象とするS3アクセスログの出力先(ターゲットバケット)として利用します。
- 非圧縮、テキスト形式のS3アクセスログの一時的な置き場として利用します。
Lambda (前処理)
<スタック名>-LambdaFunction-xxxxxxx
- アクセスログを複数の正規表現を用いてパース後、JSON形式のログデータをFirehoseに投入します。
Firehose
- JSON形式のログデータを一定時間(600秒)バッファし、圧縮(GZ形式)して出力します。
S3(JSON変換済)
s3://<スタック名>-firehose-output-<リージョン>-<アカウントID>
- Athenaで解析可能なJSONファイルの保存場所となります。
- 非圧縮で100MBのアクセスログ、15MB程度に圧縮された状態で保存されます。
- 今回のテンプレートでは、以下のS3にGZ圧縮されたログが保存されます。
s3://<スタック名>-firehose-output-<リージョン>-<アカウントID>/firehose/s3_logs/<年>/<月>/<日>/<時間>/
- FirehoseでS3プリフィックス設定を追加する事で、年月日をAthenaのパーティションとして認識するApache Hiveフォーマットで保存する事も可能です。
- 2019年3月現在、CloudFormation未対応でした。
Lambda(後処理)
<スタック名>-LambdaFunction2-xxxxxxx
- S3のイベントトリガーで起動し、Firehoseより出力されたログファイルから、V2署名(SigV2)のログを抽出します。
- 集計、確認に必要な項目を、標準出力を利用してCloudWatchLogsに出力します。
- V2署名(SigV2)のアクセスログが少ない場合、簡易なログ確認手段となります。
- V2署名のログが多く、CloudWatch Logsの費用が問題となる場合や、CloudWatch Logs Insightsを利用した可視化の必要性がない場合には、後処理Lambdaのトリガーを無効にしてください。
CloudWatch Logs (Insights)
ロググループ 「/aws/lambda/<スタック名>-LambdaFunction2-xxxxxxx」
- V2署名のS3アクセスログが蓄積されます。
CloudWatch Logs Insights クエリ例
ロググループ 「/aws/lambda/<スタック名>-LambdaFunction2-xxxxxxx」 と、任意の期間を指定してクエリを実行します。
- ログ20件の確認
fields @message
| filter SignatureVersion = 'SigV2'
| limit 20
- IPアドレス、S3バケット、操作内容、ユーザエージェント別件数集計
stats count(*) by RemoteIP, Bucket, Operation, UserAgent
| filter SignatureVersion = 'SigV2'
Athena
JSON形式で集約したログ、Athenaを利用して解析する方法を紹介します。
テーブル作成
- Firehoseの出力S3より、解析対象となる年月日のパスを確認します。
- Athenaのテーブル作成時、確認したS3パスをロケーションとして指定してします。
- DB領域は「default」を利用しました。
CREATE EXTERNAL TABLE IF NOT EXISTS default.s3accesslog (
`Bucket` string,
`Timestamp` timestamp,
`RemoteIP` string,
`Requester` string,
`RequestID` string,
`Operation` string,
`Key` string,
`RequestURI` string,
`HTTPstatus` string,
`ErrorCode` string,
`BytesSent` string,
`ObjectSize` string,
`TotalTime` string,
`TurnAroundTime` string,
`Referrer` string,
`UserAgent` string,
`VersionId` string,
`HostId` string,
`SignatureVersion` string,
`CipherSuite` string,
`AuthenticationType` string,
`HostHeader` string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://<スタック名>-firehose-output-<リージョン>-<アカウントID>/firehose/s3_logs/<年>/<月>/<日>/'
TBLPROPERTIES ('has_encrypted_data'='false');
クエリ例
- 10件表示
SELECT * FROM "default"."s3accesslog" limit 10;
- 過去24時間に発生した、V2署名アクセス、接続元IP、IAM、バケット、ユーザエージェント別集計
SELECT count(1) AS cnt,
RemoteIP,
Requester,
Bucket ,
Operation,
UserAgent ,
SignatureVersion
FROM "default"."s3accesslog"
WHERE Timestamp > now() - interval '24' hour
AND SignatureVersion = 'SigV2'
GROUP BY RemoteIP, Requester, Bucket ,Operation, UserAgent , SignatureVersion
ORDER BY cnt DESC
件数が多い場合、RemoteIPなどを除外する事で、利用頻度の高いシステムを絞り込みやすくなります。
- 過去24時間に発生した、V2署名アクセス、バケット、ユーザエージェント別集計
SELECT count(1) AS cnt,
Bucket ,
UserAgent ,
SignatureVersion
FROM "default"."s3accesslog"
WHERE Timestamp > now() - interval '24' hour
AND SignatureVersion = 'SigV2'
GROUP BY Bucket ,Operation, UserAgent , SignatureVersion
ORDER BY cnt DESC
まとめ
LambdaとFirehoseにより、S3のアクセスログの解析が簡単に実施出来るようになりました。
CloudTrailのオブジェクトログと比較して、S3のアクセスログは廉価に利用する事が可能です。
大量のS3API利用があるシステムや、継続的なV2署名ログの確認が必要な場合、
今回紹介させて頂いた仕組みなどを参考に頂ければと思います。
今回、S3のアクセスログのパースに利用した正規表現は、先のブログ記事に頂いたコメントとリンク先の情報を踏襲させていただきました。
ume 様、ありがとうございました。
s3accesslog-sigv2
AWSTemplateFormatVersion: '2010-09-09'
Description: Convert S3 access log to JSON and output to Firehose with sigv2 check (20190324)
Parameters:
FirehoseExpiredate:
Description: Number of days to keep S3 file (Firehose)
Type: String
Default: 7
Resources:
S3bucketAccesslog:
DependsOn: LambdaInvokePermission
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
BucketName: !Sub '${AWS::StackName}-s3-accesslog-${AWS::Region}-${AWS::AccountId}'
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: 1
NotificationConfiguration:
LambdaConfigurations:
- Function: !GetAtt 'LambdaFunction.Arn'
Event: s3:ObjectCreated:*
LambdaInvokePermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt 'LambdaFunction.Arn'
Action: lambda:InvokeFunction
Principal: "s3.amazonaws.com"
SourceAccount: !Ref 'AWS::AccountId'
SourceArn: !Sub 'arn:aws:s3:::${AWS::StackName}-s3-accesslog-${AWS::Region}-${AWS::AccountId}'
LogGroupLambda:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${LambdaFunction}'
RetentionInDays: 7
LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-s3-accesslog-${AWS::Region}-${AWS::AccountId}/*'
- Effect: Allow
Action:
- firehose:PutRecordBatch
Resource: !GetAtt 'Deliverystream.Arn'
LambdaFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Role: !GetAtt 'LambdaRole.Arn'
Code:
ZipFile: !Sub |
import json
import re
from datetime import datetime
import boto3
import os
import urllib.parse
s3 = boto3.client('s3')
firehose = boto3.client('firehose')
RE_TEXT = r"""
^(?P<BucketOwner>\S+)\u0020
(?P<Bucket>\S+)\u0020
\[(?P<Timestamp>[^\]]+)\]\u0020
(?P<RemoteIP>\S+)\u0020
(?P<Requester>\S+)\u0020
(?P<RequestID>[0-9A-F]{16})\u0020
(?P<Operation>[A-Z0-9\.\_]+)\u0020
(?P<Key>\S+)\u0020
\"(?P<RequestURI>.*)\"\u0020
(?P<HTTPstatus>([0-9]{3}|-))\u0020
(?P<ErrorCode>([A-Za-z]+|-))\u0020
(?P<BytesSent>([0-9]+|-))\u0020
(?P<ObjectSize>([0-9]+|-))\u0020
(?P<TotalTime>([0-9]+|-))\u0020
(?P<TurnAroundTime>([0-9]+|-))\u0020
\"(?P<Referrer>.*)\"\u0020
\"(?P<UserAgent>.*)\"\u0020
(?P<VersionId>[A-Za-z0-9\_\.]+|-)
(\u0020(?P<SessionPrincipal>\S+))?
"""
RE_TEXT2 = r"""
\u0020
(?P<HostId>([A-Za-z0-9\/\+\=]+|-))\u0020
(?P<SignatureVersion>(SigV2|SigV4|-))\u0020
(?P<CipherSuite>[A-Z0-9\-]+)\u0020
(?P<AuthenticationType>(AuthHeader|QueryString|-))\u0020
(?P<HostHeader>\S+)
(?P<SslProtocol>\u0020.*)?
"""
RE_TEXT_NOQUOTE = RE_TEXT.replace("\\\"", "").replace(".*", r"\S*")
RE_FORMAT_V4 = re.compile(RE_TEXT + RE_TEXT2, flags=re.VERBOSE)
RE_FORMAT_V3 = re.compile(RE_TEXT, flags=re.VERBOSE)
RE_FORMAT_V2 = re.compile(RE_TEXT_NOQUOTE + RE_TEXT2, flags=re.VERBOSE)
def parse_line(line):
line = line.rstrip("\n")
m = RE_FORMAT_V4.match(line)
if not m:
m = RE_FORMAT_V3.match(line)
if not m:
m = RE_FORMAT_V2.match(line)
doc = m.groupdict()
doc.pop("SessionPrincipal", None)
dt = datetime.strptime(doc["Timestamp"], "%d/%b/%Y:%H:%M:%S %z")
doc.pop("Timestamp")
doc["@Timestamp"] = dt.strftime('%Y-%m-%d %H:%M:%S')
return doc
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
r = s3.get_object(Bucket=bucket, Key=key)
a = r['Body'].read().decode('utf-8','ignore').splitlines()
l = []
for b in a:
c = parse_line(b)
if len(c) > 1:
l.append(c)
if len(l) > 0:
put_log_firehose(l)
def log_parse(data):
a = data
z = {}
def put_log_firehose(data):
s = os.environ['firehose_stream_name']
b = []
for a in data:
b.append({'Data': json.dumps(a) + "\n"})
if len(b) > 200 or len(str(b)) > 20000:
r = firehose.put_record_batch(
DeliveryStreamName = s,
Records = b
)
print (json.dumps(r))
b = []
if len(b) > 0:
r = firehose.put_record_batch(
DeliveryStreamName = s,
Records = b
)
del r['RequestResponses']
print (json.dumps(r))
Runtime: python3.7
MemorySize: 128
Timeout: 300
Description: Convert S3 access log to JSON and output to Firehose
Environment:
Variables:
firehose_stream_name: !Ref 'Deliverystream'
Tags:
- Key: CloudformationArn
Value: !Ref 'AWS::StackId'
Deliverystream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !Sub 'arn:aws:s3:::${S3bucketFirehose}'
BufferingHints:
IntervalInSeconds: '600'
SizeInMBs: '64'
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: !Sub '/aws/firehose/${AWS::StackName}'
LogStreamName: 'firehose'
CompressionFormat: GZIP
Prefix: firehose/s3_logs/
RoleARN: !GetAtt 'DeliveryRole.Arn'
ProcessingConfiguration:
Enabled: 'false'
DeliveryRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
sts:ExternalId: !Ref 'AWS::AccountId'
DeliveryPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyName: firehose_delivery_policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !Sub 'arn:aws:s3:::${S3bucketFirehose}'
- !Sub 'arn:aws:s3:::${S3bucketFirehose}*'
Roles:
- !Ref 'DeliveryRole'
LogGroupFirehose:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/firehose/${AWS::StackName}'
RetentionInDays: !Ref 'FirehoseExpiredate'
S3bucketFirehose:
DependsOn: LambdaInvokePermission2
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
BucketName: !Sub '${AWS::StackName}-firehose-output-${AWS::Region}-${AWS::AccountId}'
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: !Ref 'FirehoseExpiredate'
NotificationConfiguration:
LambdaConfigurations:
- Function: !GetAtt 'LambdaFunction2.Arn'
Event: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: prefix
Value: firehose/
- Name: suffix
Value: gz
LambdaInvokePermission2:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !GetAtt 'LambdaFunction2.Arn'
Action: lambda:InvokeFunction
Principal: "s3.amazonaws.com"
SourceAccount: !Ref 'AWS::AccountId'
SourceArn: !Sub 'arn:aws:s3:::${AWS::StackName}-firehose-output-${AWS::Region}-${AWS::AccountId}'
LogGroupLambda2:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${LambdaFunction2}'
RetentionInDays: 7
LambdaRole2:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-firehose-output-${AWS::Region}-${AWS::AccountId}/*'
LambdaFunction2:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Role: !GetAtt 'LambdaRole2.Arn'
Code:
ZipFile: !Sub |
import json
import datetime
import boto3
import os
import urllib.parse
import gzip
s3 = boto3.client('s3')
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
r = s3.get_object(Bucket=bucket, Key=key)
a = gzip.decompress(r['Body'].read()).decode('utf-8').splitlines()
l = []
for b in a:
c = json.loads(b)
if 'SignatureVersion' in c:
if c["SignatureVersion"] == 'SigV2':
print_log_stdout(c)
def print_log_stdout(data):
a = data
z = {}
z["Bucket"] = a["Bucket"]
z["Timestamp"] = a["Timestamp"]
z["RemoteIP"] = a["RemoteIP"]
z["Operation"] = a["Operation"]
z["Key"] = a["Key"]
z["RequestURI"] = a["RequestURI"]
z["UserAgent"] = a["UserAgent"]
z["SignatureVersion"] = a["SignatureVersion"]
print (json.dumps(z))
Runtime: python3.7
MemorySize: 128
Timeout: 300
Description: SigV2 access extraction from S3 access log (JSON)
Tags:
- Key: CloudformationArn
Value: !Ref 'AWS::StackId'